Source code for hysop.tools.hysop_ls

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import sys, os, argparse, tempfile, warnings, subprocess

# default caching directory
tmp = tempfile.gettempdir()

_user_hysop = os.path.expanduser("~/.cache/hysop")
if os.path.isdir(_user_hysop):
    default_cache_dir = f"{_user_hysop}/hardware"
else:
    default_cache_dir = f"{tmp}/hysop/hardware"


[docs] class BackendMask: # utility class to store reported backends def __init__(self, backends): super().__init__() assert len(backends) == 1 and isinstance(backends[0], str) backends = backends[0].split(",") if len(backends) == 0: msg = "At least one backend should be given." print(msg) sys.exit(2) host, opencl, cuda = False, False, False for b in backends: b = b.strip().lower() if b == "": continue elif b == "host": host = True elif b == "opencl": opencl = True elif b == "cuda": cuda = True elif b == "device": opencl = True cuda = True elif b == "~host": host = False elif b == "~opencl": opencl = False elif b == "~cuda": cuda = False elif b == "~device": opencl = False cuda = False elif b == "all" or b == "~none": host = True opencl = True cuda = True elif b == "none" or b == "~all": host = False opencl = False cuda = False else: msg = f"Unknown backend {b}. Aborting." print(msg) sys.exit(2) self.host = host self.opencl = opencl self.cuda = cuda def __str__(self): msg = "Backend mask is host={}, opencl={}, cuda={}." msg = msg.format(self.host, self.opencl, self.cuda) return msg
[docs] class BlackWhiteList: def __init__(self, name, objs): obj_whitelist = set() obj_blacklist = set() if objs is not None: assert len(objs) == 1 and isinstance(objs[0], str) objs = tuple(x.strip().lower() for x in objs[0].split(",")) for p in objs: if p.find("~") == 0: obj_blacklist.add(p[1:]) else: obj_whitelist.add(p) if obj_whitelist.intersection(obj_blacklist): msg = "Intersection bewteen {} black and whitelist: {}.".format( name, ", ".join(obj_whitelist.intersection(obj_blacklist)) ) print(msg) sys.exit(2) obj_mask = lambda x: ( ((not whitelist) or (x in whitelist)) and ((not blacklist) or (x not in blacklist)) ) self.name = name self.obj_whitelist = obj_whitelist self.obj_blacklist = obj_blacklist self.obj_mask = obj_mask def __call__(self, name): name = name.strip().lower() return self.obj_mask(name) def __str__(self): name = self.name.title() ss = () if self.obj_whitelist: ss += ("{} whitelist: {}".format(name, ", ".join(self.obj_whitelist)),) else: ss += (f"{name} whitelist: {None}",) if self.obj_blacklist: ss += ("{} blacklist: {}".format(name, ", ".join(self.obj_blacklist)),) else: ss += (f"{name} blacklist: {None}",) return "\n".join(ss)
[docs] class PlatformMask: # utility class to store reported platforms and vendors. def __init__(self, platforms, vendors): self.platforms = BlackWhiteList("platforms", platforms) self.vendors = BlackWhiteList("vendors", vendors) def __call__(self, platform_name, vendor_name): return self.platforms(platform_name) and self.vendors(vendor_name) def __str__(self): return f"{self.platforms}\n{self.vendors}"
[docs] class DeviceMask: # utility class to store reported devices and device types. def __init__(self, devices, device_types): self.devices = BlackWhiteList("devices", devices) self.device_types = BlackWhiteList("device types", device_types) def __call__(self, device_name, device_type_name): return self.devices(device_name) and self.device_types(device_type_name) def __str__(self): return f"{self.devices}\n{self.device_types}"
[docs] def run(arguments=None): # build the argument parser description = ( "List information about local or distant cluster topology prior to a run." ) description += "\nHardware informations are gathered using hwloc (lstopo), pyopencl and pycuda." parser = argparse.ArgumentParser(prog="hysop-ls", description=description) parser.add_argument( "-hostfile", "--hostfile", nargs=1, type=str, default=None, required=False, help=( "Provide a list of hosts as a file. This file is parsed to extract host names " + "as if they would have been passed by '-H'." ), dest="hostfile", ) parser.add_argument( "-H", "--host", nargs=1, type=str, default=None, required=False, dest="hosts", help=( "List of hosts to prospect. Defaults to localhost if no hostfile is provided. " + "If a hostfile is provided as well, exclude those hosts from hostfile." ), ) parser.add_argument( "-x", nargs=1, type=str, default=None, required=False, metavar="var0;var1;...", dest="env", help=( "Provide a semicolon separated list of extra environment variables " + "to pass to hosts." ), ) parser.add_argument( "-mca", "--mca", type=str, help="Pass MCA parameters.", dest="mca", default=None, required=False, ) parser.add_argument( "-b", "--backend", nargs=1, type=str, default=("all",), required=False, metavar="[all,host,device,opencl,cuda,none]", dest="backends", help=( "Gather only informations on specified computing backends. " + "Defaults to all available backends. " + "If preceded by ~, disable this backend." ), ) parser.add_argument( "-dt", "--device-type", nargs=1, type=str, default=None, required=False, metavar="[all,cpu,gpu,acc]", dest="device_types", help=( "Gather only informations on specified device types. " + "Defaults to all. " + "If preceded by ~, disable this device type." ), ) parser.add_argument( "-p", "--platform", nargs=1, type=str, default=None, required=False, metavar="plat0,plat1,...", dest="platforms", help=( "Print only informations about the given platforms." + "If preceded by ~, disable this platform." ), ) parser.add_argument( "-d", "--device", nargs=1, type=str, default=None, required=False, metavar="dev0,dev1,...", dest="devices", help=( "Print only informations about the given devices." + "If preceded by ~, disable this device." ), ) parser.add_argument( "-vd", "--vendor", nargs=1, type=str, default=None, required=False, metavar="vendor0,vendor1,...", dest="vendors", help=( "Print only informations about the given vendor." + "If preceded by ~, disable this vendor." ), ) parser.add_argument( "--pci-ids", nargs=1, type=str, required=False, dest="pciids", default=(None,), help="Specify a path to pci.ids. Should be shared between all scanned nodes.", ) parser.add_argument( "--cache-dir", nargs=1, type=str, required=False, dest="cache_dir", default=(default_cache_dir,), help=f"Cache node topology results into this directory. Defaults to {default_cache_dir}.", ) parser.add_argument( "--cache-host", nargs=1, type=str, default=("localhost",), required=False, dest="cache_host", help=( "Host that will cache the results into cache_dir. Defaults to localhost." ), ) parser.add_argument( "--override-cache", help="Override cached node informations.", action="store_true", default=False, dest="override_cache", ) parser.add_argument( "-v", "--version", help="Print the version of this executable and exit.", action="store_true", default=False, dest="print_version", ) parser.add_argument( "-V", "--verbose", help="Increase output verbosity.", action="store_true", default=False, dest="verbose", ) parser.add_argument( "-D", "--debug", help="Print debugging information.", action="store_true", default=False, dest="debug", ) # parse arguments (also handle help) args = parser.parse_args(arguments) # print version and exit if required if args.print_version: from hysop import version print(f"hysop-ls version {version}.") sys.exit(1) # debug and verbose verbose = args.verbose debug = args.debug # hostfile + hosts hosts = args.hosts hostfile = args.hostfile if hostfile is not None: assert len(hostfile) == 1 hostfile = hostfile[0] if not os.path.isfile(hostfile): msg = f"Hostfile '{hostfile}' does not exist." raise OSError(msg) _hosts = set() with open(hostfile) as f: for line in f.readlines(): line = line.replace("\n", "").replace("\t", " ").split(" ") if len(line) >= 1: _hosts.add(line[0]) if hosts: hosts = _hosts - set(hosts) else: hosts = _hosts hosts = tuple(set(hosts)) elif hosts is not None: assert len(hosts) == 1 hosts = tuple(set(hosts[0].split(","))) if len(hosts) == 0: msg = "No hosts specified." raise ValueError(msg) else: hosts = ("localhost",) if verbose: msg = "No host specified, using localhost." print(msg) if not hosts: msg = "Failed to parse hostfile or no hosts present, aborting." print(msg) sys.exit(2) if verbose: msg = "Hosts are {}.".format(", ".join(hosts)) print(msg) # caching override_cache = args.override_cache cache_dir = args.cache_dir[0] cache_file = f"{cache_dir}/nodes.pklz" if not os.path.isdir(cache_dir): try: os.makedirs(cache_dir) except OSError as e: msg = f"Could not create cache directory:\n {e}." print(msg) sys.exit(e.errno) if verbose: print(f"Caching directory is '{cache_dir}'.") # checking for already cached hosts from hysop.tools.cache import load_cache if not override_cache: cached_hosts = load_cache(cache_file).keys() cached_hosts = set(hosts).intersection(cached_hosts) all_hosts = hosts hosts = tuple(set(hosts) - cached_hosts) if verbose: msg = "The following hosts have already been cached: {}" msg = msg.format(", ".join(cached_hosts)) msg += "\nUse --override-cache to overwrite cached data if required." print(msg) ncached = len(cached_hosts) hostlist = args.cache_host[0] + "," + ",".join(hosts) # connect to nodes by spawning MPI processes on the fly if verbose: if len(hosts) > 0: print("Retrieving distant node hardware topologies...") else: print("Retrieving all harware topologies from cache...") if len(hosts) > 0: cmd = ["mpirun"] cmd += ["-H", hostlist] if args.env is not None: env = tuple(set(args.env[0].split(";"))) for var in env: cmd += ["-x", var] if args.mca is not None: for mca in args.mca.split(";"): cmd += ["-mca", mca] pciids = args.pciids[0] hostnames = "({},)".format(",".join(f'"{h}"' for h in hosts)) fcall = 'collect_node_informations(cache_file="{}", hostnames={}, pciids={})'.format( cache_file, hostnames, None if (pciids is None) else f'"{pciids}"' ) cmd += [ "--", f"python -c 'from hysop.tools.hysop_ls import collect_node_informations; {fcall}'", ] cmd = " ".join(cmd) if verbose: print(cmd) print("This may take some time...") FNULL = open(os.devnull, "w") try: if debug: subprocess.check_call(cmd, shell=True) elif verbose: subprocess.check_call(cmd, stderr=FNULL, shell=True) else: subprocess.check_call(cmd, stdout=FNULL, stderr=FNULL, shell=True) except subprocess.CalledProcessError as e: msg = "Command\n {}\n failed with exit status {}." msg = msg.format(cmd, e.returncode) raise RuntimeError(msg) finally: FNULL.close() if verbose: print(f"Results have been cached to {cache_file}.") # load back cached hardware topologies if verbose: print("Loading topologies and computing requested statistics...") topologies = load_cache(cache_file) topologies = {k: topologies[k] for k in all_hosts} # filtering options backends = BackendMask(args.backends) platforms = PlatformMask(args.platforms, args.vendors) devices = DeviceMask(args.devices, args.device_types) if debug: print(backends) print(platforms) print(devices) from hysop.backend.hardware.hwinfo import TopologyStatistics stats = TopologyStatistics() for topo in topologies.values(): stats += topo msg = """*** HySoP Topology Report *** Hosts: {} {} """.format( ", ".join(topologies.keys()), stats.to_string(2, 2) ) print(msg) sys.exit(0)
[docs] def collect_node_informations(cache_file, hostnames, pciids=None): from mpi4py import MPI from hysop.backend.hardware.hwinfo import PCIIds, Topology from hysop.tools.warning import HysopWarning comm = MPI.COMM_WORLD rank = comm.Get_rank() assert isinstance(hostnames, tuple) if rank == 0: topologies = comm.gather(None, root=0) topologies = topologies[1:] assert len(topologies) == len(hostnames) from hysop.tools.cache import update_cache for hostname, topo in zip(hostnames, topologies): update_cache(cache_file, hostname, topo) else: pciids = PCIIds(pciids) try: topo = Topology.parse(pciids) except: raise msg = "Failed to parse topology." warnings.warn(msg, HysopWarning) topo = None comm.gather(topo, root=0) sys.exit(0)
if __name__ == "__main__": run()